package org.spider.core;

import com.alibaba.ttl.TtlRunnable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spider.api.concurrent.*;
import org.spider.api.concurrent.SpiderThreadPoolExecutorManager.SubThreadPool;
import org.spider.api.context.SpiderContext;
import org.spider.api.context.SpiderContextHolder;
import org.spider.api.domain.model.SpiderFlow;
import org.spider.api.domain.utilDomain.SpiderNode;
import org.spider.api.enums.NoticeType;
import org.spider.api.executor.ShapeExecutor;
import org.spider.api.listener.TaskListener;
import org.spider.api.utils.flow.SpiderFlowUtils;
import org.spider.core.service.NoticeService;
import org.spider.core.utils.ExecutorsUtils;
import org.spider.core.utils.ExpressionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * executeRoot管理一个任务的开始，以及对完成的线程提取信息，执行后续的线程<br>
 * <p>
 * executeNode执行一个节点，如果有循环，那么就多次提交，这个函数的功能就是执行节点，然后将执行的线程提交到队列中<br>
 * {@link SpiderTask} <br>
 * {@linkplain SpiderFutureTask}
 */
@Component
public class Spider {
    @Value("${spider.thread.max:32}")
    private Integer maxThreads;
    @Value("${spider.thread.sub-pool:16}")
    private Integer subPoolSize;
    @Value("${spider.detect.dead-cycle:20000}")
    private Integer deadCycle;
    public static SpiderThreadPoolExecutorManager executor;
    @Resource
    private NoticeService noticeService;
    @Resource
    private List<TaskListener> listeners;

    private static final Logger logger = LoggerFactory.getLogger(Spider.class);
    private static final String ATOMIC_DEAD_CYCLE = "__atomic_dead_cycle";

    @PostConstruct
    private void initInstances() {
        executor = new SpiderThreadPoolExecutorManager(maxThreads);
        logger.info("主线程池创建成功，最大线程数为:{}",maxThreads);
    }

    public void run(SpiderFlow spiderFlow, SpiderContext context) {
        run(spiderFlow, context, new HashMap<>());
    }

    public void run(SpiderFlow spiderFlow, SpiderContext context, Map<String, Object> variables) {
        variables = variables == null ? new HashMap<>() : variables;
        SpiderNode root = SpiderFlowUtils.loadJsonFromString(spiderFlow.getConfig());
        //开始通知
        noticeService.sendNotice(spiderFlow, NoticeType.start);
        executeRoot(root, context, variables);
        //结束通知
        noticeService.sendNotice(spiderFlow,NoticeType.end);
    }

    public void runTest(SpiderNode root,SpiderContext context){
        //上下文变量存储到ThreadLocal,用在日志打印中
        SpiderContextHolder.set(context);
        executeRoot(root,context,new HashMap<>());
        SpiderContextHolder.remove();
    }

    private void beforeStart() {
        if (listeners == null || listeners.size()==0) return;
        listeners.forEach(TaskListener::beforeListener);
    }

    private void afterEnd() {
        if (listeners == null || listeners.size()==0) return;
        listeners.forEach(TaskListener::afterListener);
    }

    private int compareNodeFromFuture(Future<?> o1, Future<?> o2, Comparator<SpiderNode> cmp) {
        try {
            return cmp.compare(
                    ((SpiderTask) o1.get()).node,
                    ((SpiderTask) o2.get()).node);
        } catch (Exception e) {
            return 0;
        }
    }

    /**
     * 从根节点运行<br>
     * 初始化Context类<br>
     * 线程池执行,一次task定义。<br>
     * 从根节点开始，将每个节点都转换成一个SpiderFutureTask，提交到子线程池中<br>
     * 子线程池限制任务的提交，以及在提交任务后返回spiderFutureTask对象。将其添加到队列中，<br>
     * 有任务完成时，会根据比较器从该完成的任务中获取最大的future对象，利用get()方法获取task对象，<br>
     * 从中可以获取spiderNode,对应的shapeExecutor,然后根据条件确定是否执行以下节点<br>
     *
     * @param root
     * @param context
     * @param variables
     */
    public void executeRoot(SpiderNode root, SpiderContext context, Map<String, Object> variables) {
        StrategyQueue strategyQueue = new RandomThreadSubmitStrategy();
        //至少两个线程---当前执行+下一级执行
        int _subPoolSize = Math.max(subPoolSize, 2);
        SubThreadPool taskPool = executor.createSubThreadExecutor(_subPoolSize, strategyQueue);
        taskPool.start();
        logger.info("子线程池,创建成功,子线程池大小为:{}",_subPoolSize);
        //Context准备
        context.setThreadPool(taskPool);
        //执行前listeners
        logger.info("设置运行前监听器");
        this.beforeStart();
        Comparator<SpiderNode> comparator = strategyQueue.comparator();
        Runnable r = TtlRunnable.get(() -> {
            //执行节点
            this.executeNode(null, root, context, variables);
            logger.info("Root执行，线程开始[拉取提交模式]");
            Queue<Future<?>> queue = context.getFutureQueue();
            logger.debug("queue是否为empty:{}", queue.size());
            //最外部的循环
            while (!queue.isEmpty()) {
                try {
                    //根据比较器获取最先完成的任务(因为最先完成的任务的后续任务是优先的)
                    Optional<Future<?>> first = queue.stream().filter(Future::isDone)
                            .max((o1, o2) -> compareNodeFromFuture(o1, o2, comparator));
                    //  logger.info("尝试拉出一个运行完的节点");
                    //获取出最大的对象(如果存在的话)
                    if (first.isPresent()) {
                        logger.debug("拉出一个已运行完的节点");
                        Future<?> f = first.get();
                        queue.remove(f);
                        if (context.isRunning()) {
                            //获取result
                            SpiderTask task = (SpiderTask) f.get();
                            //这边注意要用这个Task的变量，而不是上面一开始创建的变量
                            this.executeNextNodes(task.node, context, task.variables);
                        }
                    }
                    Thread.sleep(1);
                } catch (Exception e) {
                    //日志
                    logger.error("spider发生异常...");
                }
            }
            //执行结束后listeners
            this.afterEnd();
        });
        //根节点提交执行任务
        logger.info("提交循环查询任务队列线程");
        Future<Object> future = taskPool.submitAsync(r, null, null); //root并不需要传SpiderTask，因为没有变量
        //等待线程池结束
        try {
            logger.info("开始等待任务运行结束");
            future.get();//阻塞等待所有任务执行完毕,即该任务完成
            taskPool.interrupt();
            logger.info("Spider任务队列执行完毕,{}",taskPool.isAlive());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 当前的node中可能有循环变量 loopVariable,那么这个变量名是在当前的循环中就获取好的
     *
     * @param node
     * @param context
     * @param variables
     */
    public void executeNode(SpiderNode fromNode, SpiderNode node, SpiderContext context, Map<String, Object> variables) {
        String shape = node.getShape();
        //判断箭头上的条件，如果不成立则不执行
        if (!executeCondition(fromNode, node, variables, context)) {
            return;
        }
        logger.debug("执行节点[{}:{}]", node.getNodeName(), node.getNodeId());
        //找到对应的执行器
        ShapeExecutor executor = ExecutorsUtils.get(shape);
        if (executor == null) {
            logger.error("执行失败,找不到对应的执行器:{}", shape);
            context.setRunning(false);
        }
        int loopCount = 1;    //循环次数默认为1,如果节点有循环属性且填了循环次数/集合,则取出循环次数
        int loopStart = 0;    //循环起始位置
        String loopCountStr = node.getStringByKey(ShapeExecutor.LOOP_COUNT);
        boolean isLoop = StringUtils.isNotBlank(loopCountStr);
        if (isLoop) {
            try {
                //执行表达式，看看执行多少次
                Object loopCountObj = ExpressionUtils.execute(loopCountStr, variables);
                loopCount = NumberUtils.toInt(loopCountObj.toString());
                logger.debug("获取循环次数{}={}", loopCountStr, loopCount);
            } catch (Throwable t) {
                loopCount = 0;
                logger.error("获取循环次数失败,异常信息:{}", t);
            }
        }

        if (loopCount > 0) {
            //获取循环下标的变量名称
            String loopVariableName = node.getStringByKey(ShapeExecutor.LOOP_VARIABLE_NAME);
            List<SpiderTask> tasks = new ArrayList<>();
            for (int i = loopStart; i < loopCount; i++) {
                logger.debug("循环索引:{} ,循环次数:{}",i,loopCount);
                if (context.isRunning()) {
                    // 传递变量,创建副本，是为了防止父节点的键值对被覆盖
                    Map<String, Object> variablesCopy = new HashMap<>(variables);
                    if (isLoop) {
                        // 存入下标变量
                        if (!StringUtils.isBlank(loopVariableName)) {
                            variablesCopy.put(loopVariableName, i);
                        }
                    }
                    logger.debug("当前节点:{},传递变量名==========>>>{}",node.getNodeName(),variablesCopy.keySet());
                    // 父子线程之间间传递ThreadLocal
                    Runnable runnable = TtlRunnable.get(() -> {
                        if (context.isRunning()) {
                            try {
                                //死循环检测，当执行节点次数大于阈值时，结束本次测试  todo
                                AtomicInteger executeCount = context.get(ATOMIC_DEAD_CYCLE);
                                if (executeCount != null && executeCount.incrementAndGet() > deadCycle) {
                                    context.setRunning(false);
                                    return;
                                }
                                //执行节点具体逻辑
                                executor.execute(node, context, variablesCopy);
                                //当未发生异常时，移除ex变量
                                variablesCopy.remove("ex");
                            } catch (Throwable t) {
                                variablesCopy.put("ex", t);
                                logger.error("执行节点[{}:{}]出错,异常信息：{}", node.getNodeName(), node.getNodeId(), t);
                            }
                        }
                    });
                    tasks.add(new SpiderTask(node, runnable, variablesCopy));
                }
            }
            LinkedBlockingQueue<Future<?>> futureQueue = context.getFutureQueue();
            for (SpiderTask task : tasks) {
                if (executor.isThread()) {    //判断节点是否是异步运行
                    //提交任务至线程池中,并将Future添加到队列末尾
                    Future<SpiderTask> spiderTaskFuture = context.getThreadPool().submitAsync(task.r, task, node);
                    futureQueue.add(spiderTaskFuture);
                } else {
                    FutureTask<SpiderTask> futureTask = new FutureTask<>(task.r, task);
                    futureTask.run();   //直接执行
                    futureQueue.add(futureTask);
                }
            }
        }
    }


    private void executeNextNodes(SpiderNode node, SpiderContext context, Map<String, Object> variables) {
        List<SpiderNode> nextNodes = node.getNextNodes();
        if (nextNodes == null) return;
        for (SpiderNode nextNode : nextNodes) {
            executeNode(node, nextNode, context, variables);
        }
    }


    private boolean executeCondition(SpiderNode fromNode, SpiderNode node, Map<String, Object> variables, SpiderContext context) {
        if (fromNode != null) {
            //上一个节点执行结束后，有异常
            boolean hasException = variables.get("ex") != null;
            return !hasException;
//            String exceptionFlow = node.getExceptionFlow(fromNode.getNodeId());
//            //当出现异常流转 : 1
//            //未出现异常流转 : 2
//            if (("1".equalsIgnoreCase(exceptionFlow) && !hasException)
//                    || ("2".equalsIgnoreCase(exceptionFlow) && hasException)) {
//                return false;
//            }

//            todo  流转条件判断
//            String condition = node.getCondition(fromNode.getNodeId());
//            if (StringUtils.isNotBlank(condition)) { // 判断是否有条件
//                Object result = null;
//                try {
//                    result = ExpressionUtils.execute(condition, variables);
//                } catch (Exception e) {
//                    logger.error("判断{}出错,异常信息：{}", condition, e);
//                }
//                if (result != null) {
//                    boolean isContinue = "true".equals(result) || Objects.equals(result, true);
//                    logger.debug("判断{}={}", condition, isContinue);
//                    return isContinue;
//                }
//                return false;
//            }
        }
        return true;
    }


    public static void main(String[] args) throws Exception {
        Runnable r = () -> {
            System.out.println("task--");
        };
        //        Integer result = 1;
        //        SpiderFutureTask<?> spiderFutureTask = new SpiderFutureTask(r, result, null, null);
        // 验证executeRoot的执行逻辑

        SpiderThreadPoolExecutorManager manager = new SpiderThreadPoolExecutorManager(8);
        SubThreadPool executor = manager.createSubThreadExecutor(8, new SeqStrategyQueue());
        SpiderNode spiderNode = new SpiderNode();
        spiderNode.setNodeId("001");
        SpiderNode node2 = new SpiderNode();
        node2.setNodeId("002");
        Future<?> f = executor.submitAsync(r, node2, spiderNode);
        Queue<Future<?>> queue = new LinkedList<>();
        queue.add(f);
        while (!queue.isEmpty()) {
            Optional<Future<?>> first = queue.stream().filter(Future::isDone).findFirst();
            if (first.isPresent()) {
                Future<?> future = first.get();
                System.out.println(future);
                queue.remove(future);
                System.out.println(future.get());
            }
            try {
                Thread.sleep(1000);
                System.out.println("wait...");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        System.out.println(queue);
        Spider spider = new Spider();
        System.out.println(spider);

    }

    /**
     * 封装node,runnable,shapeExecutor,以及必要的variables变量
     */
    static class SpiderTask {
        SpiderNode node;
        Runnable r;
        Map<String, Object> variables;

        public SpiderTask(SpiderNode node, Runnable r, Map<String, Object> variables) {
            this.node = node;
            this.r = r;
            this.variables = variables;
        }
    }

}
